package com.diy.sigmund.service.impl;

import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.diy.sigmund.config.ExecutorConfig;
import com.diy.sigmund.config.SqlContext;
import com.diy.sigmund.entity.Employee;
import com.diy.sigmund.mapper.EmployeeMapper;
import com.diy.sigmund.service.IEmployeeService;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Resource;
import org.apache.ibatis.session.SqlSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

/**
 * <p>
 * 服务实现类
 * </p>
 *
 * @author ylm-sigmund
 * @since 2023-03-30
 */
@Service
public class EmployeeServiceImpl extends ServiceImpl<EmployeeMapper, Employee> implements IEmployeeService {

    private static final Logger log = LoggerFactory.getLogger(EmployeeServiceImpl.class);

    /**
     * 平均拆分list方法.
     *
     * @param source
     * @param n
     * @param <T>
     * @return
     */
    public static <T> List<List<T>> averageAssign(List<T> source, int n) {
        List<List<T>> result = new ArrayList<List<T>>();
        int remaider = source.size() % n;
        int number = source.size() / n;
        int offset = 0;//偏移量
        for (int i = 0; i < n; i++) {
            List<T> value = null;
            if (remaider > 0) {
                value = source.subList(i * number + offset, (i + 1) * number + offset + 1);
                remaider--;
                offset++;
            } else {
                value = source.subList(i * number + offset, (i + 1) * number + offset);
            }
            result.add(value);
        }
        return result;
    }

    @Resource
    SqlContext sqlContext;

    /**
     * 测试多线程事务，抛出异常，回滚
     */
    @Override
    public void saveThreadException(List<Employee> employeeList) throws Exception {
        // 获取数据库连接,获取会话(内部自有事务)
        SqlSession sqlSession = sqlContext.getSqlSession();
        Connection connection = sqlSession.getConnection();
        try {
            // 设置手动提交
            connection.setAutoCommit(false);
            //获取mapper
            EmployeeMapper employeeMapper = sqlSession.getMapper(EmployeeMapper.class);
            //先做删除操作
            // employeeMapper.delete(null);
            //获取执行器
            ExecutorService service = ExecutorConfig.getThreadPool();
            List<Callable<Integer>> callableList = new ArrayList<>();
            //拆分list
            List<List<Employee>> lists = averageAssign(employeeList, 5);
            AtomicBoolean atomicBoolean = new AtomicBoolean(true);
            for (int i = 0; i < lists.size(); i++) {
                if (i == lists.size() - 1) {
                    atomicBoolean.set(false);
                }
                List<Employee> list = lists.get(i);
                //使用返回结果的callable去执行,
                Callable<Integer> callable = () -> {
                    //让最后一个线程抛出异常
                    if (!atomicBoolean.get()) {
                        throw new RuntimeException("001出现异常");
                    }
                    return employeeMapper.saveBatch(list);
                };
                callableList.add(callable);
            }
            //执行子线程
            List<Future<Integer>> futures = service.invokeAll(callableList);
            for (Future<Integer> future : futures) {
                //如果有一个执行不成功,则全部回滚
                if (future.get() <= 0) {
                    connection.rollback();
                    return;
                }
            }
            connection.commit();
            System.out.println("添加完毕");
        } catch (Exception e) {
            connection.rollback();
            log.info("error", e);
            throw new RuntimeException("002出现异常");
        } finally {
            connection.close();
        }
    }

    /**
     * 测试多线程事务.
     * <br>{@link com.zaxxer.hikari.pool.ProxyConnection.close}
     * <br>重置链接{@link com.zaxxer.hikari.pool.PoolBase.resetConnectionState}
     */
    @Override
    public void saveThread(List<Employee> employeeDOList) throws SQLException {
        // 获取数据库连接,获取会话(内部自有事务)
        SqlSession sqlSession = sqlContext.getSqlSession();
        // 返回内部数据库连接。
        Connection connection = sqlSession.getConnection();
        try {
            // 设置手动提交
            connection.setAutoCommit(false);
            EmployeeMapper employeeMapper = sqlSession.getMapper(EmployeeMapper.class);
            //先做删除操作
            // employeeMapper.delete(null);
            ExecutorService service = ExecutorConfig.getThreadPool();
            List<Callable<Integer>> callableList = new ArrayList<>();
            List<List<Employee>> lists = averageAssign(employeeDOList, 5);
            for (int i = 0; i < lists.size(); i++) {
                List<Employee> list = lists.get(i);
                Callable<Integer> callable = () -> employeeMapper.saveBatch(list);
                callableList.add(callable);
            }
            //执行子线程
            List<Future<Integer>> futures = service.invokeAll(callableList);
            for (Future<Integer> future : futures) {
                if (future.get() <= 0) {
                    connection.rollback();
                    return;
                }
            }
            connection.commit();
            System.out.println("添加完毕");
        } catch (Exception e) {
            connection.rollback();
            log.info("error", e);
            throw new RuntimeException("002出现异常");
        } finally {
            connection.close();
            System.out.println("connection close");
        }
    }
}
